package org.databandtech.job.jobs;

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;
import org.databandtech.job.entity.JdbcToHdfsS2;

/**
数据汇聚入分析数据仓 OLAP，第一次初始化全量任务，从HDFS或Hive将统计分析结果导入OLAP库，如Mysql、Oracle，HdfsToJdbcFullJob;

在使用之前需要先建立sqoop2服务站点和link，参考：
http://sqoop.apache.org/docs/1.99.7/user.html
http://sqoop.apache.org/docs/1.99.7/dev.html

sqoop connector：
+------------------------+---------+------------------------------------------------------------+----------------------+
| generic-jdbc-connector | 1.99.7  | org.apache.sqoop.connector.jdbc.GenericJdbcConnector       | FROM/TO              |
| kite-connector         | 1.99.7  | org.apache.sqoop.connector.kite.KiteConnector              | FROM/TO              |
| oracle-jdbc-connector  | 1.99.7  | org.apache.sqoop.connector.jdbc.oracle.OracleJdbcConnector | FROM/TO              |
| hdfs-connector         | 1.99.7  | org.apache.sqoop.connector.hdfs.HdfsConnector              | FROM/TO              |
| ftp-connector          | 1.99.7  | org.apache.sqoop.connector.ftp.FtpConnector                | TO                   |
| kafka-connector        | 1.99.7  | org.apache.sqoop.connector.kafka.KafkaConnector            | TO                   |
| sftp-connector         | 1.99.7  | org.apache.sqoop.connector.sftp.SftpConnector              | TO                   |
+------------------------+---------+------------------------------------------------------------+----------------------+
*
*Database connection:
Driver class <linkConfig.jdbcDriver> null
Connection String <linkConfig.connectionString> null
Username <linkConfig.username> null
Password <linkConfig.password> null
Fetch Size <linkConfig.fetchSize> null
Connection Properties <linkConfig.jdbcProperties> null

SQL Dialect:
Identifier enclose <dialect.identifierEnclose> null

Database source:
Schema name <fromJobConfig.schemaName> null
Table name <fromJobConfig.tableName> null
SQL statement <fromJobConfig.sql> null
Column names <fromJobConfig.columnList> null
Partition column <fromJobConfig.partitionColumn> null
Partition column nullable <fromJobConfig.allowNullValueInPartitionColumn> null
Boundary query <fromJobConfig.boundaryQuery> null

Incremental read:
Check column <incrementalRead.checkColumn> null
Last value <incrementalRead.lastValue> null

Database target:
Schema name <toJobConfig.schemaName> null
Table name <toJobConfig.tableName> null
Column names <toJobConfig.columnList> null
Staging table <toJobConfig.stageTableName> null
Clear stage table <toJobConfig.shouldClearStageTable> null

*/
public class JdbcToHdfsSqoop2Job {

static String SQOOPURL = "http://hadoop001:12000/sqoop/";	
	
	static String USER = "root";	
	static String DBUSER = "root";	
	static String DBPASSWORD = "mysql";	
	
	static String CONNECTION = "jdbc:mysql://192.168.13.66:3307/databand?useUnicode=true&characterEncoding=utf-8&useSSL=false";
	static String HDFSURL = "hdfs://hadoop001:8020";	
	static String HADOOPCONF = "/usr/app/hadoop-2.9.2/etc/hadoop/";	
	
	public static void main(String[] args) {
		
		
		SqoopClient client = new SqoopClient(SQOOPURL);
		
		//###### 所有资源必须先用以下代码创建，或者用sqoop2的shell建立  #######
		//创建链接
		//如果JDBCREPO_0021:Given link name is in use，说明link 已经被使用，要删除link，需要先删除使用link的job
		//SqoopUtils.deleteLink(client,"mysqllink");
		//SqoopUtils.createjdbcLink(client,"mysqllink",CONNECTION,USER,DBUSER,DBPASSWORD);		
		//SqoopUtils.createhdfsLink(client,"hdfslink2020","root",HDFSURL,HADOOPCONF);		
		
		//获取配置项
		//SqoopUtils.displayConfig(client,"generic-jdbc-connector");
		//SqoopUtils.displayConfig(client,"hdfs-connector");
		//查看所有links
		//SqoopUtils.showAllLinks(client);
		//查看所有Connectors
		//SqoopUtils.showAllConnectors(client);
		//查看所有jobs
		//SqoopUtils.showAllJobs(client);
		
		//job实例，全量
		JdbcToHdfsS2 model = new JdbcToHdfsS2();
		model.setJobName("product_order_count_by_cate");//根据指标名
		model.setFromLink("mysqllink");
		model.setToLink("hdfslink2020");
		model.setFromschemaName("databand");
		model.setFromTable("city");
		model.setFull(true);
		model.setHdfsOutputDirectory("/databand1/city1");
		model.setNumExtractors("3");
		model.setPartitionColumn("CountryCode");
		
		createJdbcToHdfsJob(client, model);
		
		//startJob(client,"product_order_count_by_cate");
		//checkJob(client, "product_order_count_by_cate");

		//client.stopJob("product_order_count_by_cate_FULL");
		//client.deleteJob("product_order_count_by_cate_FULL2020");
		
	}

	private static void startJob(SqoopClient client,String jobname) {
		//Job start
		MSubmission submission = client.startJob(jobname);
		System.out.println("Job Submission Status : " + submission.getStatus());
		if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
		  System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
		}
		System.out.println("job id :" + submission.getExternalJobId());
		System.out.println("Job link : " + submission.getExternalLink());
		Counters counters = submission.getCounters();
		if(counters != null) {
		  System.out.println("Counters:");
		  for(CounterGroup group : counters) {
		    System.out.print("\t");
		    System.out.println(group.getName());
		    for(Counter counter : group) {
		      System.out.print("\t\t");
		      System.out.print(counter.getName());
		      System.out.print(": ");
		      System.out.println(counter.getValue());
		    }
		  }
		}
	}

	private static void checkJob(SqoopClient client, String jobname) {
		//Check job status for a running job
		MSubmission submissionStatus = client.getJobStatus(jobname);
		if(submissionStatus.getStatus().isRunning() && submissionStatus.getProgress() != -1) {
		  System.out.println("Progress : " + String.format("%.2f %%", submissionStatus.getProgress() * 100));
		}
	}

	private static void createJdbcToHdfsJob(SqoopClient client, JdbcToHdfsS2 model) {
		MJob job = client.createJob(model.getFromLink(), model.getToLink());
		job.setName(model.getJobName());
		job.setCreationUser("sqoop2");
		// set the "FROM" link job config values ：jdbc
		MFromConfig fromJobConfig = job.getFromJobConfig();
		fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue(model.getFromschemaName());//数据库名
		fromJobConfig.getStringInput("fromJobConfig.tableName").setValue(model.getFromTable());//表名
		fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue(model.getPartitionColumn());//分区列
		// set the "TO" link job config values ： hdfs
		MToConfig toJobConfig = job.getToJobConfig();
		toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue(model.getHdfsOutputDirectory());
		// set the driver config values
		MDriverConfig driverConfig = job.getDriverConfig();
		//driverConfig.getStringInput("throttlingConfig.numExtractors").setValue(model.getNumExtractors());

		Status status = client.saveJob(job);
		if(status.canProceed()) {
		 System.out.println("Created Job with Job Name: "+ job.getName());
		} else {
		 System.out.println("Something went wrong creating the job");
		}
	}



	

}
